package cn._51doit.flink.day09;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * 将两个数据流不划分窗口，按照时间范围进行join，即intervalJoin
 *
 * 以第一个流中的数据为标准进行比较时间
 *
 * 实现步骤：
 * 1.分别将两个流按照相同的条件进行KeyBy（可以保证key等值的数据一定进入到同一台机器的同一个分区中）
 * 2.将两个数据流的数据缓存到KeyedState，然后将两个流Connected到一起（可以共享状态）
 *
 */
public class EventTimeIntervalJoin {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1000,c1,300
        //2999,c8,300
        //3000,c8,300
        //5000,c2,200
        DataStreamSource<String> lines1 = env.socketTextStream("localhost", 8888);
        //1200,c1,图书
        //1999,c1,图书
        //2000,c8,电话
        //5001,c2,家具
        DataStreamSource<String> lines2 = env.socketTextStream("localhost", 9999);

        //期望返回的数据
        //1000,c1,300,1200,图书
        //按照EventTime进行join，窗口长度为5000秒，使用新的提取EventTime生成WaterMark的API
        SingleOutputStreamOperator<String> lines1WithWaterMark = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {
            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream1WithWaterMark = lines1WithWaterMark.map(new MapFunction<String, Tuple3<Long, String, String>>() {
            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        SingleOutputStreamOperator<String> lines2WithWaterMark = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {
            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream2WithWaterMark = lines2WithWaterMark.map(new MapFunction<String, Tuple3<Long, String, String>>() {
            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        //先将第一个流进行KeyBy
        KeyedStream<Tuple3<Long, String, String>, String> keyedStream1 = tpStream1WithWaterMark.keyBy(t -> t.f1);
        //然后将第二个流进行KeyBy
        KeyedStream<Tuple3<Long, String, String>, String> keyedStream2 = tpStream2WithWaterMark.keyBy(t -> t.f1);

        SingleOutputStreamOperator<Tuple5<Long, String, String, Long, String>> res = keyedStream1.intervalJoin(keyedStream2)
                .between(Time.seconds(-1), Time.seconds(1)) //指定时间范围的
                .upperBoundExclusive() //不包括后面的时间 [,)
                .process(new ProcessJoinFunction<Tuple3<Long, String, String>, Tuple3<Long, String, String>, Tuple5<Long, String, String, Long, String>>() {

                    @Override
                    public void processElement(Tuple3<Long, String, String> left, Tuple3<Long, String, String> right, Context ctx, Collector<Tuple5<Long, String, String, Long, String>> out) throws Exception {

                        out.collect(Tuple5.of(left.f0, left.f1, left.f2, right.f0, right.f2));

                    }
                });

        res.print();

        env.execute();
    }
}
